Library Imports

from pyspark.sql import SparkSession
from pyspark.sql import types as T

from pyspark.sql import functions as F

from datetime import datetime
from decimal import Decimal

Template

spark = (
    SparkSession.builder
    .master("local")
    .appName("Section 2.10 - Spark Functions aren't Enough, I Need my Own!")
    .config("spark.some.config.option", "some-value")
    .getOrCreate()
)

sc = spark.sparkContext

import os

data_path = "/data/pets.csv"
base_path = os.path.dirname(os.getcwd())
path = base_path + data_path
pets = spark.read.csv(path, header=True)
pets.toPandas()
id breed_id nickname birthday age color
0 1 1 King 2014-11-22 12:30:31 5 brown
1 2 3 Argus 2016-11-22 10:05:10 10 None
2 3 1 Chewie 2016-11-22 10:05:10 15 None
3 3 2 Maple 2018-11-22 10:05:10 17 white

Spark Functions aren't Enough, I Need my Own!

What happens when you can't find functions that can perform what you want? Try looking again 🤪. But if this is really the case, your last resort can be to implement an udf short for user defined function.

These are functions written in python code that take a subset of columns as the input and returns a new column back. There are multiple steps in creating a udf, we'll walk through one and decompose it step by step.

Option 1: Steps

# Step 1: Create your own function
def uppercase_words(word, cuttoff_length=2):
    return word.upper()[:cuttoff_length] if word else None

s = 'Hello World!'
print(s)
print(uppercase_words(s, 20))

# Step 2: Register the udf as a spark udf
uppercase_words_udf = F.udf(uppercase_words, T.StringType())

# Step 3: Use it!
(
    pets
    .withColumn('nickname_uppercase', uppercase_words_udf(F.col('nickname')))
    .withColumn('color_uppercase', uppercase_words_udf(F.col('color')))
    .withColumn('color_uppercase_trimmed', uppercase_words_udf(F.col('color'), F.lit(3)))
    .toPandas()
)
Hello World!
HELLO WORLD!
id breed_id nickname birthday age color nickname_uppercase color_uppercase color_uppercase_trimmed
0 1 1 King 2014-11-22 12:30:31 5 brown KI BR BRO
1 2 3 Argus 2016-11-22 10:05:10 10 None AR None None
2 3 1 Chewie 2016-11-22 10:05:10 15 None CH None None
3 3 2 Maple 2018-11-22 10:05:10 17 white MA WH WHI

What Happened?

Although the upper function is defined in the spark fuctions library it still serves as a good example. Let's breakdown the steps involved:

  1. Create the function that you want (uppercase_words), remembering that only spark columnar objects are accepted as input arguments to the function. This means if you want to use other values, you will need to cast it to a column object using F.lit() from the previous sections.
  2. Register the python function as a spark function, and specify the spark return type. The format is like so F.udf(python_function, spark_return_type).
  3. Now you can use the function!

Option 2: 1 Less Step

from pyspark.sql.functions import udf

# Step 1: Create and register your own function
@udf('string', 'int')
def uppercase_words(word, cuttoff_length=2):
    return word.upper()[:cuttoff_length] if word else None

# Step 2: Use it!
(
    pets
    .withColumn('color_uppercase', uppercase_words_udf(F.col('color')))
    .toPandas()
)
id breed_id nickname birthday age color color_uppercase
0 1 1 King 2014-11-22 12:30:31 5 brown BR
1 2 3 Argus 2016-11-22 10:05:10 10 None None
2 3 1 Chewie 2016-11-22 10:05:10 15 None None
3 3 2 Maple 2018-11-22 10:05:10 17 white WH

What Happened?

The udf function can also be used as a decorator to register your python functions as spark functions.

Where the inputs are the types of the arguments to the udf.

The Ugly Part of udfs

TL;DR Spark functions are executed on the JVM, while Python UDFs are executed in Python. This will require extra python memory for your spark application (will explain in Chapter 6) and more passing of data between the JVM and Python.

If your function can be performed with the spark functions, you should alway use the spark functions. udfs perform very poorly compared to the spark functions. This is a greate response that encapsulates the reason as to why:

The main reasons are already enumerated above and can be reduced to a simple fact that Spark DataFrame is natively a JVM structure and standard access methods are implemented by simple calls to Java API. UDF from the other hand are implemented in Python and require moving data back and forth.

While PySpark in general requires data movements between JVM and Python, in case of low level RDD API it typically doesn't require expensive serde activity. Spark SQL adds additional cost of serialization and serialization as well cost of moving data from and to unsafe representation on JVM. The later one is specific to all UDFs (Python, Scala and Java) but the former one is specific to non-native languages.

Unlike UDFs, Spark SQL functions operate directly on JVM and typically are well integrated with both Catalyst and Tungsten. It means these can be optimized in the execution plan and most of the time can benefit from codgen and other Tungsten optimizations. Moreover these can operate on data in its "native" representation.

So in a sense the problem here is that Python UDF has to bring data to the code while SQL expressions go the other way around.

source: https://stackoverflow.com/questions/38296609/spark-functions-vs-udf-performance

Option 3: Pandas Vectorized UDFs

# TODO: https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

Summary

  • We learnt how to use a python function within spark, called udfs.
  • We learnt how to pass non-column objects into the function by using knowledge gained from previous chapters.
  • We learnt about the bad parts of udfs and their performance issues.

results matching ""

    No results matching ""